#!/usr/bin/python3
# TLP Profiles Daemon (tlp-pd) control program
# tlpctl is an adaptation of powerprofilesctl,
# offers a subset of its commands and is
# supplemented by TLP-specific shortcuts.
#
# Copyright (c) 2025 Thomas Koch <linrunner at gmx.net> and others.
# SPDX-License-Identifier: GPL-3.0-only


import argparse
import errno
import os
import signal
import subprocess
import sys
from gi.repository import Gio, GLib

# --- Constants
PROGRAM_NAME = "tlpctl"
PROGRAM_LONG = "TLP Profiles Control"
PROGRAM_DESC = "Control TLP power profiles"
DAEMON_LONG = "TLP Profiles Daemon"

# --- D-Bus constants
PD_BUS_NAME = "org.freedesktop.UPower.PowerProfiles"
PD_INTERFACE_NAME = PD_BUS_NAME
PD_OBJECT_PATH = "/org/freedesktop/UPower/PowerProfiles"
PROPERTIES_IFACE = "org.freedesktop.DBus.Properties"


# --- Helper functions
def get_proxy(interface):
    # Call the daemon via proxy
    bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
    return Gio.DBusProxy.new_sync(
        bus,
        Gio.DBusProxyFlags.NONE,
        None,
        PD_BUS_NAME,
        PD_OBJECT_PATH,
        interface,
        None,
    )


def get_pd_property(prop):
    # Get tlp-pd property
    proxy = get_proxy(PROPERTIES_IFACE)
    try:
        return proxy.Get("(ss)", PD_INTERFACE_NAME, prop)
    except GLib.Error as error:
        sys.stderr.write(f"{error}\n")
        sys.exit(1)


def get_profile_choices():
    # Get supported profiles
    return [profile["Profile"] for profile in get_pd_property("Profiles")]


def list_profiles_brief():
    # Print a brief list of available profiles, active profile marked with '*'
    profiles = get_pd_property("Profiles")
    active_profile = get_pd_property("ActiveProfile")

    for profile in profiles:
        marker = "*" if profile["Profile"] == active_profile else " "
        print(f"{marker} {profile['Profile']}")


def set_profile(profile):
    # Switch TLP profile via tlp-pd
    proxy = get_proxy(PROPERTIES_IFACE)
    proxy.Set(
        "(ssv)",
        PD_INTERFACE_NAME,
        "ActiveProfile",
        GLib.Variant.new_string(profile),
    )


def command(func):
    # Execute a tlpctl command
    def wrapper(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except GLib.Error as error:
            sys.stderr.write(f"Error: {error}\n")
            sys.exit(1)
        except ValueError as error:
            sys.stderr.write(f"Error: {error}\n")
            sys.exit(1)

    return wrapper


# --- powerprofilessctl-compatible commands
@command
def _version(args):
    client_version = "@TLPVER@"
    daemon_ver = get_pd_property("Version")
    print(f"Client: {client_version}")
    print(f"Daemon: {daemon_ver}")


@command
def _set_profile(args):
    # Set active profile
    set_profile(args.profile[0])


@command
def _get(args):
    # Get active profile
    print(get_pd_property("ActiveProfile"))


@command
def _list(args):
    # Print a list of profiles with associateddrivers,
    # active profile marked with '*',
    # followed by BatteryAware and LogLevel properties
    profiles = get_pd_property("Profiles")
    reason = get_pd_property("PerformanceDegraded")
    degraded = reason != ""
    active_profile = get_pd_property("ActiveProfile")

    print("Available power profiles (* = active):\n")
    for profile in profiles:
        marker = "*" if profile["Profile"] == active_profile else " "
        print(f"{marker} {profile['Profile']}:")
        for driver in ["CpuDriver", "PlatformDriver"]:
            if driver not in profile:
                continue
            value = profile[driver]
            print("    %-15s: %s" % (driver, value))
        if profile["Profile"] == "performance":
            print(
                "    %-15s: %s"
                % (
                    "Degraded",
                    f"yes ({reason})" if degraded else "no",
                )
            )
        print("")

    print("Dynamic changes from charger and battery events: ", end="")
    if get_pd_property("BatteryAware"):
        print("yes")
    else:
        print("no")

    print(f"tlp-pd LogLevel: {get_pd_property('LogLevel')}")


@command
def _list_holds(_args):
    # List active profile holds with command and reason
    holds = get_pd_property("ActiveProfileHolds")

    index = 0
    for hold in holds:
        if index > 0:
            print("")
        print("Hold:")
        print("  Profile:        ", hold["Profile"])
        print("  Application ID: ", hold["ApplicationId"])
        print("  Reason:         ", hold["Reason"])
        index += 1


@command
def _launch(args):
    # Run a command using a specific power profile (hold)
    reason = args.reason
    profile = args.profile
    appid = args.appid
    if not args.arguments:
        raise ValueError("No command to launch")
    if not args.appid:
        appid = args.arguments[0]
    if not profile:
        profile = "performance"
    if not reason:
        reason = f"Running {args.arguments}"
    ret = 0

    proxy = get_proxy(PD_INTERFACE_NAME)
    cookie = proxy.HoldProfile("(sss)", profile, reason, appid)

    # Run the command
    try:
        with subprocess.Popen(args.arguments) as launched_app:
            # Redirect the same signal to the child
            def receive_signal(signum, _stack):
                launched_app.send_signal(signum)

            redirected_signals = [
                signal.SIGTERM,
                signal.SIGINT,
                signal.SIGABRT,
            ]

            for sig in redirected_signals:
                signal.signal(sig, receive_signal)

            try:
                launched_app.wait()
                ret = launched_app.returncode
            except KeyboardInterrupt:
                ret = launched_app.returncode

            for sig in redirected_signals:
                signal.signal(sig, signal.SIG_DFL)

    except FileNotFoundError as error:
        sys.stderr.write(f"Error: command '{appid}' not found\n{error}\n")
        ret = errno.ENOENT

    proxy.ReleaseProfile("(u)", cookie)

    if ret < 0:
        # Use standard POSIX signal exit code.
        os.kill(os.getpid(), -ret)
        return

    sys.exit(ret)


# --- TLP-specific commands
@command
def _performance(_args):
    # Switch to performance profile
    set_profile("performance")
    print("Switched to performance profile.")


@command
def _balanced(_args):
    # Switch to balanced profile
    set_profile("balanced")
    print("Switched to balanced profile.")


@command
def _power_saver(_args):
    # Switch to power-saver profile
    set_profile("power-saver")
    print("Switched to power-saver profile.")


@command
def _set_loglevel(args):
    # Set tlp-pd loglevel
    loglevel = args.loglevel[0]
    if loglevel not in ["info", "debug"]:
        sys.stderr.write(
            f"Error: invalid loglevel '{loglevel}'. Must be 'info' or 'debug'.\n"
        )
        sys.exit(1)

    proxy = get_proxy(PROPERTIES_IFACE)
    proxy.Set(
        "(ssv)",
        PD_INTERFACE_NAME,
        "LogLevel",
        GLib.Variant.new_string(loglevel),
    )
    print(f"tlp-pd loglevel set to '{loglevel}'.")


# --- Parse arguments
def get_parser():
    parser = argparse.ArgumentParser(
        prog=f"{PROGRAM_NAME}",
        description=f"{PROGRAM_DESC}",
        epilog=f"Use '{PROGRAM_NAME} <command> --help' to get detailed help for individual commands",
    )
    subparsers = parser.add_subparsers(help="Individual command help", dest="command")

    # --- TLP-specific profile commands (shortcuts)
    parser_performance = subparsers.add_parser(
        "performance",
        help="Switch to performance profile (shortcut for 'set performance')",
    )
    parser_performance.set_defaults(func=_performance)

    parser_balanced = subparsers.add_parser(
        "balanced", help="Switch to balanced profile (shortcut for 'set balanced')"
    )
    parser_balanced.set_defaults(func=_balanced)

    parser_power_saver = subparsers.add_parser(
        "power-saver",
        help="Switch to power-saver profile (shortcut for 'set power-saver')",
    )
    parser_power_saver.set_defaults(func=_power_saver)

    # --- Original powerprofilesctl commands
    parser_list = subparsers.add_parser("list", help="List available power profiles")
    parser_list.set_defaults(func=_list)

    # list-actiond
    parser_list_holds = subparsers.add_parser(
        "list-holds", help="List current power profile holds (from launch command)"
    )
    parser_list_holds.set_defaults(func=_list_holds)

    # get
    parser_get = subparsers.add_parser(
        "get", help="Print the currently active power profile", description=f"{help}."
    )
    parser_get.set_defaults(func=_get)

    # set
    parser_set = subparsers.add_parser("set", help="Set the active power profile")
    parser_set.add_argument(
        "profile",
        nargs=1,
        help="Profile to activate",
        choices=get_profile_choices(),
    )
    parser_set.set_defaults(func=_set_profile)

    # launch
    parser_launch = subparsers.add_parser(
        "launch",
        help="Run a command using a specific power profile (hold)",
        description="Applies the given power profile, runs the command, "
        "and returns to the original profile.",
    )
    parser_launch.add_argument(
        "arguments",
        nargs="*",
        help="Command to launch",
    )
    parser_launch.add_argument(
        "--profile",
        "-p",
        required=False,
        help="Profile that is kept in hold while the command is running",
    )
    parser_launch.add_argument(
        "--reason", "-r", required=False, help="Reason to be noted on the hold"
    )
    parser_launch.add_argument(
        "--appid", "-i", required=False, help="AppID to be noted on the hold"
    )
    parser_launch.set_defaults(func=_launch)

    # loglevel
    parser_set = subparsers.add_parser(
        "loglevel", help="Set the loglevel of the tlp-pd daemon"
    )
    parser_set.add_argument(
        "loglevel",
        nargs=1,
        choices=["info", "debug", ""],
    )
    parser_set.set_defaults(func=_set_loglevel)

    # version
    parser_version = subparsers.add_parser(
        "version", aliases=["--version"], help="Print version information and exit"
    )
    parser_version.set_defaults(func=_version)

    # shtab completion support (hide from help text)
    if "--print-completion" in sys.argv:
        try:
            import shtab

            shtab.add_argument_to(parser, ["--print-completion"])

        except ImportError:
            print(
                "Warning: shtab is not installed. Install with: pip install shtab",
                file=sys.stderr,
            )

    return parser


def check_unknown_args(args, unknown_args, cmd):
    if cmd != "launch":
        return False

    for idx, unknown_arg in enumerate(unknown_args):
        arg = args[idx]
        if arg == cmd:
            return True
        if unknown_arg == arg:
            return False

    return True


# --- MAIN
def main():
    parser = get_parser()
    args, unknown = parser.parse_known_args()

    if not args.command:
        # Default behavior is to print a brief profile list if no command is given
        list_profiles_brief()
    else:
        if check_unknown_args(sys.argv[1:], unknown, args.command):
            args.arguments += unknown
            unknown = []

        if unknown:
            msg = argparse._("unrecognized arguments: %s")
            parser.error(msg % " ".join(unknown))

        # Run given command
        args.func(args)


if __name__ == "__main__":
    main()
